package cn.tiakon.dmp.etl

import cn.tiakon.dmp.beans.Log
import cn.tiakon.dmp.untils.{ContextUtils, Utils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}

/**
  * 原始日志(.logs文件)转换成parquet文件
  * 通过样例类来封装 原始日志
  * @author Tiakon
  *         2018/3/28 19:49
  */
object Logs2ParquetPlus {
  def main(args: Array[String]): Unit = {

    val sc = ContextUtils.getSparkContext()
    val sqlContext: SQLContext = new SQLContext(sc)
    if (args.length != 2) {
      println(
        """
          |
          |参数异常
          |dataInputPath、outDataPath
          |
        """.stripMargin)
      sys.exit(1)
    }
    val Array(dataInputPath, outDataPath) = args
    val rdd: RDD[String] = sc.textFile(dataInputPath)

    //  -1  截取空字符串；并过滤掉长度小于85的信息
    val splited: RDD[Array[String]] = rdd.map(_.split(",", -1)).filter(arr => arr.length >= 85)

    val logRDD: RDD[Log] = splited.map(Log(_))

    Utils.deleteFileByCoreSite(sc, outDataPath)

    val dataFrame: DataFrame = sqlContext.createDataFrame(logRDD)
    dataFrame.registerTempTable("logs")

    Utils.deleteFileByCoreSite(sc, outDataPath)

    dataFrame.write.parquet(outDataPath)
    val selected: DataFrame = sqlContext.sql("select * from logs")
    selected.show()
    sc.stop()
  }
}
